/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.runners.core.construction;



import com.google.auto.service.AutoService;

import com.bff.gaia.unified.sdk.coders.ByteArrayCoder;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.coders.DoubleCoder;

import com.bff.gaia.unified.sdk.coders.IterableCoder;

import com.bff.gaia.unified.sdk.coders.KvCoder;

import com.bff.gaia.unified.sdk.coders.LengthPrefixCoder;

import com.bff.gaia.unified.sdk.coders.StringUtf8Coder;

import com.bff.gaia.unified.sdk.coders.VarLongCoder;

import com.bff.gaia.unified.sdk.transforms.windowing.GlobalWindow;

import com.bff.gaia.unified.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;

import com.bff.gaia.unified.sdk.util.WindowedValue.FullWindowedValueCoder;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.BiMap;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableBiMap;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableMap;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Sets;



import java.util.Map;

import java.util.Set;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkState;



/** The {@link CoderTranslatorRegistrar} for coders which are shared across languages. */

@AutoService(CoderTranslatorRegistrar.class)

public class ModelCoderRegistrar implements CoderTranslatorRegistrar {



  // The URNs for coders which are shared across languages

  @VisibleForTesting

  static final BiMap<Class<? extends Coder>, String> BEAM_MODEL_CODER_URNS =

      ImmutableBiMap.<Class<? extends Coder>, String>builder()

          .put(ByteArrayCoder.class, ModelCoders.BYTES_CODER_URN)

          .put(StringUtf8Coder.class, ModelCoders.STRING_UTF8_CODER_URN)

          .put(KvCoder.class, ModelCoders.KV_CODER_URN)

          .put(VarLongCoder.class, ModelCoders.INT64_CODER_URN)

          .put(IntervalWindowCoder.class, ModelCoders.INTERVAL_WINDOW_CODER_URN)

          .put(IterableCoder.class, ModelCoders.ITERABLE_CODER_URN)

          .put(Timer.Coder.class, ModelCoders.TIMER_CODER_URN)

          .put(LengthPrefixCoder.class, ModelCoders.LENGTH_PREFIX_CODER_URN)

          .put(GlobalWindow.Coder.class, ModelCoders.GLOBAL_WINDOW_CODER_URN)

          .put(FullWindowedValueCoder.class, ModelCoders.WINDOWED_VALUE_CODER_URN)

          .put(DoubleCoder.class, ModelCoders.DOUBLE_CODER_URN)

          .build();



  public static final Set<String> WELL_KNOWN_CODER_URNS = BEAM_MODEL_CODER_URNS.values();



  @VisibleForTesting

  static final Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> BEAM_MODEL_CODERS =

      ImmutableMap.<Class<? extends Coder>, CoderTranslator<? extends Coder>>builder()

          .put(ByteArrayCoder.class, CoderTranslators.atomic(ByteArrayCoder.class))

          .put(StringUtf8Coder.class, CoderTranslators.atomic(StringUtf8Coder.class))

          .put(VarLongCoder.class, CoderTranslators.atomic(VarLongCoder.class))

          .put(IntervalWindowCoder.class, CoderTranslators.atomic(IntervalWindowCoder.class))

          .put(GlobalWindow.Coder.class, CoderTranslators.atomic(GlobalWindow.Coder.class))

          .put(KvCoder.class, CoderTranslators.kv())

          .put(IterableCoder.class, CoderTranslators.iterable())

          .put(Timer.Coder.class, CoderTranslators.timer())

          .put(LengthPrefixCoder.class, CoderTranslators.lengthPrefix())

          .put(FullWindowedValueCoder.class, CoderTranslators.fullWindowedValue())

          .put(DoubleCoder.class, CoderTranslators.atomic(DoubleCoder.class))

          .build();



  static {

    checkState(

        BEAM_MODEL_CODERS.keySet().containsAll(BEAM_MODEL_CODER_URNS.keySet()),

        "Every Model %s must have an associated %s. Missing: %s",

        Coder.class.getSimpleName(),

        CoderTranslator.class.getSimpleName(),

        Sets.difference(BEAM_MODEL_CODER_URNS.keySet(), BEAM_MODEL_CODERS.keySet()));

    checkState(

        ModelCoders.urns().equals(BEAM_MODEL_CODER_URNS.values()),

        "All Model %ss should have an associated java %s",

        Coder.class.getSimpleName(),

        Coder.class.getSimpleName());

  }



  public static boolean isKnownCoder(Coder<?> coder) {

    return BEAM_MODEL_CODER_URNS.containsKey(coder.getClass());

  }



  @Override

  public Map<Class<? extends Coder>, String> getCoderURNs() {

    return BEAM_MODEL_CODER_URNS;

  }



  @Override

  public Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getCoderTranslators() {

    return BEAM_MODEL_CODERS;

  }

}